/******************************************************************************
 * This example                                                               *
 * - emulates a client launching a request every 10-300ms                     *
 * - uses a CURL-backend consisting of a master and 10 workers                *
 * - runs until it is shut down by a CTRL+C signal                            *
 *                                                                            *
 *                                                                            *
 * Schematic view:                                                            *
 *                                                                            *
 *    client      |    client_job    |    curl_master    |    curl_worker     *
 *          /--------------|*|-------------\       /-------------|*|          *
 *         /---------------|*|--------------\     /                           *
 *        /----------------|*|---------------\   /                            *
 *     |*| ----------------|*|----------------|*|----------------|*|          *
 *        \________________|*|_______________/   \                            *
 *         \_______________|*|______________/     \                           *
 *          \______________|*|_____________/       \-------------|*|          *
 *                                                                            *
 *                                                                            *
 * Communication pattern:                                                     *
 *                                                                            *
 *        client_job      curl_master        curl_worker                      *
 *          |                  |                  |                           *
 *          | ----(read)-----> |                  |                           *
 *          |                  | --(forward)----> |                           *
 *          |                                     |---\                       *
 *          |                                     |   |                       *
 *          |                                     |<--/                       *
 *          | <-------------(reply)-------------- |                           *
 *          X                                                                 *
 ******************************************************************************/

// C includes
#include <csignal>
#include <cstdlib>
#include <ctime>

// C++ includes
#include <iostream>
#include <random>
#include <string>
#include <vector>

// CAF
#include "caf/all.hpp"
#include "caf/io/all.hpp"

CAF_PUSH_WARNINGS
#include <curl/curl.h>
CAF_POP_WARNINGS

// disable some clang warnings here caused by CURL macros
#ifdef __clang__
#  pragma clang diagnostic ignored "-Wshorten-64-to-32"
#  pragma clang diagnostic ignored "-Wdisabled-macro-expansion"
#  pragma clang diagnostic ignored "-Wunused-const-variable"
#endif // __clang__

CAF_BEGIN_TYPE_ID_BLOCK(curl_fuse, first_custom_type_id)

  CAF_ADD_TYPE_ID(curl_fuse, (std::vector<char>) )

  CAF_ADD_ATOM(curl_fuse, read_atom)
  CAF_ADD_ATOM(curl_fuse, fail_atom)
  CAF_ADD_ATOM(curl_fuse, next_atom)
  CAF_ADD_ATOM(curl_fuse, reply_atom)
  CAF_ADD_ATOM(curl_fuse, finished_atom)

CAF_END_TYPE_ID_BLOCK(curl_fuse)

using namespace caf;

using buffer_type = std::vector<char>;

namespace color {

// UNIX terminal color codes
constexpr char reset[] = "\033[0m";
constexpr char reset_endl[] = "\033[0m\n";
constexpr char black[] = "\033[30m";
constexpr char red[] = "\033[31m";
constexpr char green[] = "\033[32m";
constexpr char yellow[] = "\033[33m";
constexpr char blue[] = "\033[34m";
constexpr char magenta[] = "\033[35m";
constexpr char cyan[] = "\033[36m";
constexpr char white[] = "\033[37m";
constexpr char bold_black[] = "\033[1m\033[30m";
constexpr char bold_red[] = "\033[1m\033[31m";
constexpr char bold_green[] = "\033[1m\033[32m";
constexpr char bold_yellow[] = "\033[1m\033[33m";
constexpr char bold_blue[] = "\033[1m\033[34m";
constexpr char bold_magenta[] = "\033[1m\033[35m";
constexpr char bold_cyan[] = "\033[1m\033[36m";
constexpr char bold_white[] = "\033[1m\033[37m";

} // namespace color

// number of HTTP workers
constexpr size_t num_curl_workers = 10;

// minimum delay between HTTP requests
constexpr int min_req_interval = 10;

// maximum delay between HTTP requests
constexpr int max_req_interval = 300;

// put everything into anonymous namespace (except main)
namespace {

// provides print utility, a name, and a parent
struct base_state {
  base_state(local_actor* thisptr) : self(thisptr) {
    // nop
  }

  actor_ostream print() {
    return aout(self) << color << self->name() << " (id = " << self->id()
                      << "): ";
  }

  virtual bool init(std::string m_color) {
    color = std::move(m_color);
    print() << "started" << color::reset_endl;
    return true;
  }

  virtual ~base_state() {
    print() << "done" << color::reset_endl;
  }

  local_actor* self;
  std::string color;
};

struct client_job_state : base_state {
  static inline const char* name = "curl.client-job";
  using base_state::base_state;
};

// encapsulates an HTTP request
behavior client_job(stateful_actor<client_job_state>* self,
                    const actor& parent) {
  if (!self->state.init(color::blue))
    return {}; // returning an empty behavior terminates the actor
  self->send(parent, read_atom_v, "http://www.example.com/index.html",
             uint64_t{0}, uint64_t{4095});
  return {
    [=](reply_atom, const buffer_type& buf) {
      self->state.print() << "successfully received " << buf.size() << " bytes"
                          << color::reset_endl;
      self->quit();
    },
    [=](fail_atom) {
      self->state.print() << "failure" << color::reset_endl;
      self->quit();
    },
  };
}

struct client_state : base_state {
  client_state(local_actor* selfptr)
    : base_state(selfptr),
      count(0),
      re(rd()),
      dist(min_req_interval, max_req_interval) {
    // nop
  }

  size_t count;
  std::random_device rd;
  std::default_random_engine re;
  std::uniform_int_distribution<int> dist;
  static inline const char* name = "curl.client";
};

// spawns HTTP requests
behavior client(stateful_actor<client_state>* self, const actor& parent) {
  using std::chrono::milliseconds;
  self->link_to(parent);
  if (!self->state.init(color::green))
    return {}; // returning an empty behavior terminates the actor
  self->send(self, next_atom_v);
  return {
    [=](next_atom) {
      auto& st = self->state;
      st.print() << "spawn new client_job (nr. " << ++st.count << ")"
                 << color::reset_endl;
      // client_job will use IO
      // and should thus be spawned in a separate thread
      self->spawn<detached + linked>(client_job, parent);
      // compute random delay until next job is launched
      auto delay = st.dist(st.re);
      self->delayed_send(self, milliseconds(delay), next_atom_v);
    },
  };
}

struct curl_state : base_state {
  curl_state(local_actor* selfptr) : base_state(selfptr) {
    // nop
  }

  ~curl_state() override {
    if (curl != nullptr)
      curl_easy_cleanup(curl);
  }

  static size_t callback(void* data, size_t bsize, size_t nmemb, void* userp) {
    size_t size = bsize * nmemb;
    auto& buf = reinterpret_cast<curl_state*>(userp)->buf;
    auto first = reinterpret_cast<char*>(data);
    auto last = first + bsize;
    buf.insert(buf.end(), first, last);
    return size;
  }

  bool init(std::string m_color) override {
    curl = curl_easy_init();
    if (curl == nullptr)
      return false;
    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &curl_state::callback);
    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
    return base_state::init(std::move(m_color));
  }

  CURL* curl = nullptr;
  buffer_type buf;
  static inline const char* name = "curl.worker";
};

// manages a CURL session
behavior curl_worker(stateful_actor<curl_state>* self, const actor& parent) {
  if (!self->state.init(color::yellow))
    return {}; // returning an empty behavior terminates the actor
  return {[=](read_atom, const std::string& fname, uint64_t offset,
              uint64_t range) -> message {
    auto& st = self->state;
    st.print() << "read" << color::reset_endl;
    for (;;) {
      st.buf.clear();
      // set URL
      curl_easy_setopt(st.curl, CURLOPT_URL, fname.c_str());
      // set range
      std::ostringstream oss;
      oss << offset << "-" << range;
      curl_easy_setopt(st.curl, CURLOPT_RANGE, oss.str().c_str());
      // set curl callback
      curl_easy_setopt(st.curl, CURLOPT_WRITEDATA,
                       reinterpret_cast<void*>(&st));
      // launch file transfer
      auto res = curl_easy_perform(st.curl);
      if (res != CURLE_OK) {
        st.print() << "curl_easy_perform() failed: " << curl_easy_strerror(res)
                   << color::reset_endl;
      } else {
        long hc = 0; // http return code
        curl_easy_getinfo(st.curl, CURLINFO_RESPONSE_CODE, &hc);
        switch (hc) {
          default:
            st.print() << "http error: download failed with "
                       << "'HTTP RETURN CODE': " << hc << color::reset_endl;
            break;
          case 200: // ok
          case 206: // partial content
            st.print() << "received " << st.buf.size()
                       << " bytes with 'HTTP RETURN CODE': " << hc
                       << color::reset_endl;
            // tell parent that this worker is done
            self->send(parent, finished_atom_v);
            return make_message(reply_atom_v, std::move(st.buf));
          case 404: // file does not exist
            st.print() << "http error: download failed with "
                       << "'HTTP RETURN CODE': 404 (file does "
                       << "not exist!)" << color::reset_endl;
        }
      }
      // avoid 100% cpu utilization if remote side is not accessible
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
  }};
}

struct master_state : base_state {
  master_state(local_actor* selfptr) : base_state(selfptr) {
    // nop
  }
  std::vector<actor> idle;
  std::vector<actor> busy;
  static inline const char* name = "curl.master";
};

behavior curl_master(stateful_actor<master_state>* self) {
  if (!self->state.init(color::magenta))
    return {}; // returning an empty behavior terminates the actor
  // spawn workers
  for (size_t i = 0; i < num_curl_workers; ++i)
    self->state.idle.push_back(
      self->spawn<detached + linked>(curl_worker, self));
  auto worker_finished = [=] {
    auto sender = self->current_sender();
    auto last = self->state.busy.end();
    auto i = std::find(self->state.busy.begin(), last, sender);
    if (i == last)
      return;
    self->state.idle.push_back(*i);
    self->state.busy.erase(i);
    self->state.print() << "worker is done" << color::reset_endl;
  };
  self->state.print() << "spawned " << self->state.idle.size() << " worker(s)"
                      << color::reset_endl;
  return {
    [=](read_atom rd, std::string& str, uint64_t x, uint64_t y) {
      auto& st = self->state;
      st.print() << "received {'read'}" << color::reset_endl;
      // forward job to an idle worker
      actor worker = st.idle.back();
      st.idle.pop_back();
      st.busy.push_back(worker);
      self->delegate(worker, rd, std::move(str), x, y);
      st.print() << st.busy.size() << " active jobs" << color::reset_endl;
      if (st.idle.empty()) {
        // wait until at least one worker finished its job
        self->become(keep_behavior, [=](finished_atom) {
          worker_finished();
          self->unbecome();
        });
      }
    },
    [=](finished_atom) { worker_finished(); },
  };
}

// signal handling for ctrl+c
std::atomic<bool> shutdown_flag{false};

} // namespace

void caf_main(actor_system& system) {
  // install signal handler
  struct sigaction act;
  act.sa_handler = [](int) { shutdown_flag = true; };
  auto set_sighandler = [&] {
    if (sigaction(SIGINT, &act, nullptr) != 0) {
      std::cerr << "fatal: cannot set signal handler" << std::endl;
      abort();
    }
  };
  set_sighandler();
  // initialize CURL
  curl_global_init(CURL_GLOBAL_DEFAULT);
  // get a scoped actor for the communication with our CURL actors
  scoped_actor self{system};
  // spawn client and curl_master
  auto master = self->spawn<detached>(curl_master);
  self->spawn<detached>(client, master);
  // poll CTRL+C flag every second
  while (!shutdown_flag)
    std::this_thread::sleep_for(std::chrono::seconds(1));
  aout(self) << color::cyan << "received CTRL+C" << color::reset_endl;
  // shutdown actors
  anon_send_exit(master, exit_reason::user_shutdown);
  // await actors
  act.sa_handler = [](int) { abort(); };
  set_sighandler();
  aout(self) << color::cyan
             << "await CURL; this may take a while "
                "(press CTRL+C again to abort)"
             << color::reset_endl;
  self->await_all_other_actors_done();
  // shutdown CURL
  curl_global_cleanup();
}

CAF_MAIN(id_block::curl_fuse, io::middleman)
